package com.example.demo.batch;

import org.apache.flink.api.common.functions.CoGroupFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.CoGroupedStreams;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.ProcessingTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
import org.apache.flink.util.Collector;

import java.util.HashMap;
import java.util.Map;

public class CoGourpDemo {

    /**
     * 打印结果
     * {1=xiaoming}
     * {2=xiaowang}
     * {2=xiaowang xiaoli}
     * {1=xiaoming shinelon}
     * {3=hhhhhh}
     */
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStream<Tuple2<Long, String>> source1 = env.fromElements(Tuple2.of(1L, "xiaoming"), Tuple2.of(2L, "xiaowang"));
        DataStream<Tuple2<Long, String>> source2 = env.fromElements(Tuple2.of(2L, "xiaoli"), Tuple2.of(1L, "shinelon"), Tuple2.of(3L, "hhhhhh"));
        // 组合流算子 按照规则组合(不能组合的 会自动分为一类)
        CoGroupedStreams<Tuple2<Long, String>, Tuple2<Long, String>>.Where<Long>.EqualTo equalTo = source1.coGroup(source2).where(item -> item.f0).equalTo(item2 -> item2.f0);
        // 时间窗口分组 每三秒钟进行一次计算
        equalTo.window(ProcessingTimeSessionWindows.withGap(Time.seconds(3)))
                // 触发器 key大于一个触发
                .trigger(CountTrigger.of(1))
                // 分组后进行聚合输出
                .apply(new CoGroupFunction<Tuple2<Long, String>, Tuple2<Long, String>, Map<Long, String>>() {
            @Override
            public void coGroup(Iterable<Tuple2<Long, String>> iterable, Iterable<Tuple2<Long, String>> iterable1, Collector<Map<Long, String>> collector) {
                Map<Long, String> map = new HashMap<>();
                for (Tuple2<Long, String> tuple : iterable) {
                    String str = map.get(tuple.f0);
                    if (str == null) {
                        map.put(tuple.f0, tuple.f1);
                    } else {
                        if (!str.equals(tuple.f1))
                            map.put(tuple.f0, str + " " + tuple.f1);
                    }
                }

                for (Tuple2<Long, String> tuple : iterable1) {
                    String str = map.get(tuple.f0);
                    if (str == null) {
                        map.put(tuple.f0, tuple.f1);
                    } else {
                        if (!str.equals(tuple.f1))
                            map.put(tuple.f0, str + " " + tuple.f1);
                    }
                }
                collector.collect(map);
            }
        }).print();
        env.execute();
    }
}
